JBoss Community Archive (Read Only)

Infinispan 5.0

Distributed Data Stream Processing Framework In Infinispan

Note: This is an update to Infinispan Distributed Execution Framework.

Infinispan as a source of distributed data processing

Infinispan distributes key-value entries across the grid.  Each node in the grid contains approximately same number of entries thanks to the consistent hash function.  Once the massive number of entries are distributed evenly across the grid, it is natural to run a distributed data processing task on it. Once well-abstracted API is defined, node affinity issue and multi-core utilization can be done behind the scene.

Ensuring node affinity

Once a data processing task is defined, the task is transferred to every node and executed by the node.  Since Infinispan distributes the entries evenly across the grid, node affinity issue is resolved automatically.  Each node feeds the input entries to the task.

Splitting, stealing, and taking-over the input entries

If the grid has replication turned on. more than one node can contain the same entry.  Running the same task on the same data would waste the resources, and thus the input entries for the task has to be split by the framework behind the scene.  Once split, approximately same number of non-overlapped input entries will be fed to the task on each replica.

Ideally, the split input entries doesn't need to be merged or re-split.  However, in reality, there are two cases that need to be considered:

The task might run faster on a certain replica while slower on some others even if the input entries were split evenly depending on how the task was implemented by user.  In such a case, the idle nodes could process the input entries which were not processed yet by the busy nodes (i.e. work stealing), only if the cost of work-stealing does not cancel the gain.

Also, during the task execution, a replica can go offline for some reason.  In such a case, other replicas have to take over the input entries that the offline replica was assigned to.  This is basically implemented in the same way with work stealing because we can consider the offline node as 'the busiest one'.

Utilizing multiple cores on a node

Once the input entries are fed to a node, they are split again to fully utilize all CPU cores.  Proper scheduling needs to be done so that only the same number of threads with the number of available cores run at the same time to avoid excessive context switching.  Work stealing also needs to be implemented to address the same problem mentioned above in a local level.

Task as a data stream pipeline and a building block

A task can be represented as a data processor whose input is a stream of the entries and whose output is also a stream of the transformed or aggregated entries (or an entry).  Exposing the data as a stream instead of a random-accessible map has the following advantages:

  • A user can chain more than one task to achieve more complex data processing without storing intermediary entries to the grid.

  • The framework can be reused for processing continuous queries, which is basically a stream of entries.

  • The framework can optimize the data access order based on its internal data structure.

The Task interface looks like the following:

public interface EntryStreamProcessor<K, V> {
   void processEntryStream(EntryStreamProcessorContext<K, V> ctx) throws Exception;
}

public interface EntryStreamProcessorContext<K, V> extends Iterable<Map.Entry<K, V>> {
   Map.Entry<K, V> read();
   void write(Object key, Object value);
   void write(Map.Entry<?, ?> entry);
   void write(Iterable<Map.Entry> entries);

   // Fetch the entries that are not part of the input.
   Iterable<Map.Entry<K, V>> query(/* TBD */);
}

The original GridFileSizeDistributionTask example could be re-written as follows:

public class GridFileQuery implements EntryStreamProcessor<Object, Object> {

   private final cacheName;

   public GridFileQuery(String cacheName) {
      this.cacheName = cacheName;
   }

   void processEntryStream<EntryStreamProcessorContext<Object, Object> ctx) {
      ctx.write(ctx); // Pass through (will do nothing if this is the first task (i.e. empty input))
      ctx.write(ctx.query("select all entries from %s", cacheName)); // query interface TBD
   }
}

public class GridFileSizeCounter implements EntryStreamProcessor<Object, GridFile.Metadata> {
   public void processEntryStream(EntryStreamProcessorContext<Object, GridFile.Metadata> ctx) {
      long sum = 0;
      for (Map.Entry<Object, GridFile.Metadata> e: ctx) {
         sum += e.getValue().getLength();
      }
      ctx.write("sum", sum);
   }
}

public class IntegerSummarizer implements EntryStreamProcessor<Object, Long> {

   private final String key;

   public IntegerSummarizer(String key) {
      this.key = key;
   }

   public void processEntryStream(EntryStreamProcessorContext<Object, Long> ctx) {
      long sum = 0;
      for (Map.Entry<Object, Long> e: ctx) {
         sum += e.getValue().longValue();
      }
      ctx.write(key, sum);
   }
}

// Now assemble the tasks into a larger task.
CacheManager cm = ...;
CoordinatedTask task = cm.newCoordinatedTask("finalOutcomeCacheName");
cm.addLocalTask(new GridFileQuery("my_gridfs"));          // Fetch
cm.addLocalTask(new GridFileSizeCounter());               // Map
cm.addGlobalTask(new IntegerSummarizer("my_gridfs_size"); // Reduce

// Execute the coordinated task.
Future<?> f = task.execute();
f.get();

// Get the result.
Long size = (Long) cm.getCache("finalOutcomeCacheName").get("my_gridfs_size");

The CoordinatedTask is serialized and transferred to all nodes in the grid when execute() is called by user.  In each node, the GridFileQuery fetches the input entries from the CacheManager via query.  Unless the query explicitly asks the CacheManager to fetch the remote entries, only the local entries are fetched.  The fetched entries are fed to the GridFileSizeCounter via a bounded pipe or a direct method invocation depending on the configuration.  Once all GridFileSizeCounter instances finishes summing up the file size, it is written to a bounded pipe.  The last task - IntegerSummarizer - is a global task, and therefore a single node is chosen to execute the last task and it processes all output entries generated by the previous task of all nodes.

Please note that each sub task can be reused to be part of a larger and more complex task.  For example, IntegerSummarizer could be reused in other numerical analysis and GridFileQuery could be reused in any GridFS-related tasks.

Support for various languages

A user should be able to describe and submit the task in various languages such as Ruby and Python.

Support for client-server mode and development environment

A user should be able to describe the task in one's development environment (e.g. Eclipse plugin) and submit the task to the grid even if the user's development environment is not part of the grid.

Security Manager

Properly implemented security manager should be employed for user created tasks to prevent possible abuse.

JBoss.org Content Archive (Read Only), exported from JBoss Community Documentation Editor at 2020-03-11 09:08:35 UTC, last content change 2010-11-30 11:04:54 UTC.